/*
 * Copyright 2014 The Netty Project
 *
 * The Netty Project licenses this file to you under the Apache License,
 * version 2.0 (the "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at:
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */

package io.netty.util;

import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.internal.StringUtil;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * Checks if a thread is alive periodically and runs a task when a thread dies.
 * <p>
 * This thread starts a daemon thread to check the state of the threads being watched and to invoke
 * their associated {@link Runnable}s.  When there is no thread to watch (i.e. all threads are
 * dead), the daemon thread will terminate itself, and a new daemon thread will be started again
 * when a new watch is added.
 * </p>
 *
 * @deprecated will be removed in the next major release
 */
@Deprecated
public final class ThreadDeathWatcher {

  private static final InternalLogger logger = InternalLoggerFactory
      .getInstance(ThreadDeathWatcher.class);
  // visible for testing
  static final ThreadFactory threadFactory;

  // Use a MPMC queue as we may end up checking isEmpty() from multiple threads which may not be allowed to do
  // concurrently depending on the implementation of it in a MPSC queue.
  private static final Queue<Entry> pendingEntries = new ConcurrentLinkedQueue<Entry>();
  private static final Watcher watcher = new Watcher();
  private static final AtomicBoolean started = new AtomicBoolean();
  private static volatile Thread watcherThread;

  static {
    String poolName = "threadDeathWatcher";
    String serviceThreadPrefix = SystemPropertyUtil.get("io.netty.serviceThreadPrefix");
    if (!StringUtil.isNullOrEmpty(serviceThreadPrefix)) {
      poolName = serviceThreadPrefix + poolName;
    }
    // because the ThreadDeathWatcher is a singleton, tasks submitted to it can come from arbitrary threads and
    // this can trigger the creation of a thread from arbitrary thread groups; for this reason, the thread factory
    // must not be sticky about its thread group
    threadFactory = new DefaultThreadFactory(poolName, true, Thread.MIN_PRIORITY, null);
  }

  /**
   * Schedules the specified {@code task} to run when the specified {@code thread} dies.
   *
   * @param thread the {@link Thread} to watch
   * @param task the {@link Runnable} to run when the {@code thread} dies
   * @throws IllegalArgumentException if the specified {@code thread} is not alive
   */
  public static void watch(Thread thread, Runnable task) {
    if (thread == null) {
      throw new NullPointerException("thread");
    }
    if (task == null) {
      throw new NullPointerException("task");
    }
    if (!thread.isAlive()) {
      throw new IllegalArgumentException("thread must be alive.");
    }

    schedule(thread, task, true);
  }

  /**
   * Cancels the task scheduled via {@link #watch(Thread, Runnable)}.
   */
  public static void unwatch(Thread thread, Runnable task) {
    if (thread == null) {
      throw new NullPointerException("thread");
    }
    if (task == null) {
      throw new NullPointerException("task");
    }

    schedule(thread, task, false);
  }

  private static void schedule(Thread thread, Runnable task, boolean isWatch) {
    pendingEntries.add(new Entry(thread, task, isWatch));

    if (started.compareAndSet(false, true)) {
      final Thread watcherThread = threadFactory.newThread(watcher);
      // Set to null to ensure we not create classloader leaks by holds a strong reference to the inherited
      // classloader.
      // See:
      // - https://github.com/netty/netty/issues/7290
      // - https://bugs.openjdk.java.net/browse/JDK-7008595
      AccessController.doPrivileged(new PrivilegedAction<Void>() {
        @Override
        public Void run() {
          watcherThread.setContextClassLoader(null);
          return null;
        }
      });

      watcherThread.start();
      ThreadDeathWatcher.watcherThread = watcherThread;
    }
  }

  /**
   * Waits until the thread of this watcher has no threads to watch and terminates itself. Because a
   * new watcher thread will be started again on {@link #watch(Thread, Runnable)}, this operation is
   * only useful when you want to ensure that the watcher thread is terminated
   * <strong>after</strong> your application is shut down and there's no chance of calling
   * {@link #watch(Thread, Runnable)} afterwards.
   *
   * @return {@code true} if and only if the watcher thread has been terminated
   */
  public static boolean awaitInactivity(long timeout, TimeUnit unit) throws InterruptedException {
    if (unit == null) {
      throw new NullPointerException("unit");
    }

    Thread watcherThread = ThreadDeathWatcher.watcherThread;
    if (watcherThread != null) {
      watcherThread.join(unit.toMillis(timeout));
      return !watcherThread.isAlive();
    } else {
      return true;
    }
  }

  private ThreadDeathWatcher() {
  }

  private static final class Watcher implements Runnable {

    private final List<Entry> watchees = new ArrayList<Entry>();

    @Override
    public void run() {
      for (; ; ) {
        fetchWatchees();
        notifyWatchees();

        // Try once again just in case notifyWatchees() triggered watch() or unwatch().
        fetchWatchees();
        notifyWatchees();

        try {
          Thread.sleep(1000);
        } catch (InterruptedException ignore) {
          // Ignore the interrupt; do not terminate until all tasks are run.
        }

        if (watchees.isEmpty() && pendingEntries.isEmpty()) {

          // Mark the current worker thread as stopped.
          // The following CAS must always success and must be uncontended,
          // because only one watcher thread should be running at the same time.
          boolean stopped = started.compareAndSet(true, false);
          assert stopped;

          // Check if there are pending entries added by watch() while we do CAS above.
          if (pendingEntries.isEmpty()) {
            // A) watch() was not invoked and thus there's nothing to handle
            //    -> safe to terminate because there's nothing left to do
            // B) a new watcher thread started and handled them all
            //    -> safe to terminate the new watcher thread will take care the rest
            break;
          }

          // There are pending entries again, added by watch()
          if (!started.compareAndSet(false, true)) {
            // watch() started a new watcher thread and set 'started' to true.
            // -> terminate this thread so that the new watcher reads from pendingEntries exclusively.
            break;
          }

          // watch() added an entry, but this worker was faster to set 'started' to true.
          // i.e. a new watcher thread was not started
          // -> keep this thread alive to handle the newly added entries.
        }
      }
    }

    private void fetchWatchees() {
      for (; ; ) {
        Entry e = pendingEntries.poll();
        if (e == null) {
          break;
        }

        if (e.isWatch) {
          watchees.add(e);
        } else {
          watchees.remove(e);
        }
      }
    }

    private void notifyWatchees() {
      List<Entry> watchees = this.watchees;
      for (int i = 0; i < watchees.size(); ) {
        Entry e = watchees.get(i);
        if (!e.thread.isAlive()) {
          watchees.remove(i);
          try {
            e.task.run();
          } catch (Throwable t) {
            logger.warn("Thread death watcher task raised an exception:", t);
          }
        } else {
          i++;
        }
      }
    }
  }

  private static final class Entry {

    final Thread thread;
    final Runnable task;
    final boolean isWatch;

    Entry(Thread thread, Runnable task, boolean isWatch) {
      this.thread = thread;
      this.task = task;
      this.isWatch = isWatch;
    }

    @Override
    public int hashCode() {
      return thread.hashCode() ^ task.hashCode();
    }

    @Override
    public boolean equals(Object obj) {
      if (obj == this) {
        return true;
      }

      if (!(obj instanceof Entry)) {
        return false;
      }

      Entry that = (Entry) obj;
      return thread == that.thread && task == that.task;
    }
  }
}
